package com.dbflow5.rx2.query;

import com.dbflow5.BaseUnitTest;
import com.dbflow5.TestDatabase;
import com.dbflow5.config.FlowManager;
import com.dbflow5.models.Author_Table;
import com.dbflow5.models.Blog_Table;
import com.dbflow5.models.ForeignKeyModels;
import com.dbflow5.models.SimpleModel_Table;
import com.dbflow5.models.SimpleTestModels.SimpleModel;
import com.dbflow5.query.Method;
import com.dbflow5.query.ModelQueriable;
import com.dbflow5.query.Operator;
import com.dbflow5.query.SQLite;
import com.dbflow5.reactivestreams.transaction.TransactionObservable;
import com.dbflow5.structure.Model;

import io.reactivex.rxjava3.functions.Consumer;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;

import static org.junit.Assert.assertEquals;

public class RXFlowableTest extends BaseUnitTest {
    @Test
    public void testCanObserveChanges() {
        FlowManager.databaseForTable(SimpleModel.class, dbFlowDatabase -> {
            for (int i = 0; i <= 100; i++) {
                Model.save(SimpleModel.class, new SimpleModel(String.valueOf(i)), dbFlowDatabase);
            }

            AtomicReference<List<SimpleModel>> list = new AtomicReference<>(new ArrayList<>());

            AtomicInteger triggerCount = new AtomicInteger();

            Flowable<List<SimpleModel>> flowable = TransactionObservable.asFlowable(SQLite.select().from(SimpleModel.class)
                .where(Method.cast(SimpleModel_Table.name).asInteger().greaterThan(50)), ModelQueriable::queryList);

            Disposable subscribe = flowable.subscribe(simpleModels -> {
                list.set(simpleModels);
                triggerCount.addAndGet(1);
            });
            assertEquals(50, list.get().size());
            subscribe.dispose();

            Model.save(SimpleModel.class, new SimpleModel("should not trigger"), dbFlowDatabase);
            assertEquals(1, triggerCount.get());
            return null;
        });
    }

    @Test
    public void testObservesJoinTables() {
        FlowManager.database(TestDatabase.class, db -> {
            Operator<?> joinOn = Blog_Table.name.withTable().eq(Author_Table.first_name.withTable().plus(" ").plus(Author_Table.last_name.withTable()));
            assertEquals("Blog.name=Author.first_name+' '+Author.last_name", joinOn.getQuery());

            final List<ForeignKeyModels.Blog>[] list = new List[]{new ArrayList<>()};
            final int[] calls = {0};
            TransactionObservable.asFlowable(SQLite.select()
                    .from(ForeignKeyModels.Blog.class)
                    .leftOuterJoin(ForeignKeyModels.Author.class)
                    .on(joinOn), ModelQueriable::queryList)
                    .subscribe(blogs -> {
                        calls[0]++;
                        list[0] = blogs;
                    });

            List<ForeignKeyModels.Author> authors = new ArrayList<>();
            for(int i=1;i<11;i++) {
                authors.add(new ForeignKeyModels.Author(i, i+"name", i+ "last"));
            }
            db.executeTransactionFunc(d -> {
                for (int index=1;index<11;index++){
                    Model.save(ForeignKeyModels.Blog.class, new ForeignKeyModels.Blog(index, index+"name " + index + "last", authors.get(index-1)), d);
                }
                return null;
            });

            assertEquals(10, list[0].size());
            assertEquals(2, calls[0]); // 1 for initial, 1 for batch of changes
            return null;
        });
    }
}
